Skip to content
This repository has been archived by the owner on Mar 20, 2018. It is now read-only.

Add OperationFuture class #142

Merged
merged 20 commits into from
Jan 10, 2017
Merged

Add OperationFuture class #142

merged 20 commits into from
Jan 10, 2017

Conversation

evaogbe
Copy link
Contributor

@evaogbe evaogbe commented Dec 27, 2016

Implements _OperationFuture class to use when generating LRO code.

Used in googleapis/gapic-generator#891.

@codecov-io
Copy link

codecov-io commented Dec 27, 2016

Current coverage is 97.55% (diff: 96.32%)

Merging #142 into master will decrease coverage by 0.41%

@@             master       #142   diff @@
==========================================
  Files             9         10     +1   
  Lines           640        737    +97   
  Methods           0          0          
  Messages          0          0          
  Branches          0          0          
==========================================
+ Hits            627        719    +92   
- Misses           13         18     +5   
  Partials          0          0          

Powered by Codecov. Last update a3a290a...e269ad0

Copy link
Contributor

@lukesneeringer lukesneeringer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really good. I have a few things that I would like to poke at. :-)


return self._last_operation

def _poll(self, timeout=None):
Copy link
Contributor

@lukesneeringer lukesneeringer Dec 28, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One useful change I might recommend here is for the polling timer to "back off". It would be pretty easy for someone to fall into the trap of calling result(timeout=None) on an operation that will take (for example) five minutes to complete, and this method would end up polling every second -- meaning this is 300 API requests.

While there are several ways to approach this, I would recommend a recursive algorithm where the wait time is a parameter:

def _poll(self, timeout=None, increment=1):
    """Poll the LRO server to see if the operation has completed. If not, sleep for
    `increment` seconds and then try again.
    """
    # Sanity check: If we have exhausted our timeout, error out.
    if timeout is not None and timeout < 0:
        raise futures.TimeoutError()

    # Poll the server. If the operation has completed, return the Operation instance
    # with the result.
    if self._get_operation().done:
        return self._last_operation

    # Okay, we are not done yet. Wait a little while and poll again.
    time.sleep(increment)
    return self._poll(
        increment=min([30, increment * 2]),
        timeout=timeout - increment,
    )

This will -- assuming I implemented it correctly ;-) -- poll the server at the following increments: 1, 2, 4, 8, 16, 30, 30, 30, 30...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have an exponential backoff algorithm for retrying API calls with transient errors. We should reuse that.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Ruby and Node I opted to use exponential backoff. All gax libraries have a BackoffSettings class used for retryable api calls, I used this class to specify the backoff settings used for operation polling. Here is an example of it being used for retryable api calls.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have an exponential backoff algorithm for retrying API calls with transient errors. We should reuse that.

Oh. Yeah, definitely use something already baked. :-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added backoff

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would suggest reusing (or refactoring to make reusable) the code in api_callable.py that @landrito linked to. I suspect you should be able to use the _retryable function as-is, specifying the numerical parameters as a RetryOptions object.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_retryable doesn't support infinite timeouts as-is. It's really messy, trying to reuse it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you refactor the generic (incl. infinite timeout support) exponential backoff algorithm into its own module retry and then import retry in both here and in api_callable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@geigerj Refactored into a retry module, PTAL

import unittest2

from fixture_pb2 import Simple
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain to me what this is?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"""
return _from_any(self._metadata_type, self._first_operation.metadata)

def last_operation_data(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the docstring here is probably wrong? It says that this method will block if the first call is not done yet, but self._last_operation is set by the constructor (and the argument to the constructor is required). As best as I can tell, there is not an actual situation where this method will block.

I am not sure whether the correct solution is to amend the comment or the code.

Also, do we want a public-facing method that is not part of the c.f.Future interface? I can think of several reasons why maybe we do; I am just bringing it up to ensure it is an explicit decision.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the docstring is wrong. That's my fault, since I added this line in the design doc.

IIUC, we are waiting until the first call to get the longrunning.Operation object is completed before we return an OperationFuture wrapper, so the condition regarding the first call must be satisfied already.

@garrettjonesgoogle I copied this comment from your Java design. Are we missing anything here regarding "Blocks if the first call isn't done yet"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lukesneeringer Yes, we do want these extra methods that are not part of c.f.Future. They provide access for power users to the actual meta-API mechanism that sits behind this wrapper.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, we are waiting until the first call to get the longrunning.Operation object is completed before we return an OperationFuture wrapper, so the condition regarding the first call must be satisfied already.

That seems right. Regardless of the nitty-gritty, it is definitely the case that by the time this class is instantiated, there is at least one Operation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed doctring


threading.Thread(target=_execute_clbk, args=(self, done_clbk)).start()

def operation_name(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My inquiry about the blocking component in last_operation_data (below, sorry) applies to this as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my response there

"""Issues OperationsApi.get_operation and returns value of Operation.done."""
return self._get_operation().done

def add_done_callback(self, done_clbk):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few thoughts on this:

First, while it is a bit of a nitpick, I would ask that the positional argument be spelled fn to match c.f.Future and asyncio. It is possible to pass positional arguments using their keyword names, and many people do.

Second, I am having a debate with myself over the thread here. Part of me thinks that this method should return the Thread object (so that the programmer would have the option of calling thread.join() later). On the other hand, I do not particularly want to be tied to this exact implementation; we might want to change this to use gevent or something. Since c.f.Future and asyncio do not expect this to return, probably we should leave it.

Finally, it is worth noting that the threading module is relatively poorly implemented in Python and is generally disfavored (with multiprocess being preferred). If this is something you can change quickly, it would probably be preferable. That said, I also think it is not the end of the world if that is time-consuming, especially since @bjwatson has already fingered this spot for a potential improvement as we discover how it is actually used.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 to fn.

I also agree with switching to multiprocess, since Luke says its preferred among the Python community. However, let's not expose our implementation to the caller. We won't necessarily know what the thread or subprocess is at the time of calling add_done_callback(); that could be dynamically determined at the time of executing the callback. The primary thread would need to use a monitor object or something like that if it needs to know when the callback has been triggered.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another reason not to expose the implementation to the caller is that there are several reasons why we might change it in the future (for example, by implementing our own event loop as discussed elsewhere, although that is not the only reason that might happen).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. now fn
  2. still hiding implementation details
  3. using mp

Copy link

@landrito landrito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is close!

'Could not convert {} to {}'.format(
any_pb.__class__.__name__, pb_type.__name__))

return pb_type.FromString(any_pb.value)
Copy link

@landrito landrito Dec 28, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be best to use the unpack instance method from here.

ie

def _from_any(pb_type, any_pb):
    msg = pb_type()
    if not any_pb.unpack(msg):
        raise TypeError(
            'Could not convert {} to {}'.format(
                any_pb.__class__.__name__, pb_type.__name__))

    return msg

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL! Thanks @landrito

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched to unpack

"""Returns the value of Operation.metadata from the initial Operation object
returned from the first call. Blocks if the first call isn't done yet.
"""
return _from_any(self._metadata_type, self._first_operation.metadata)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I have seen, changes to metadata is meaningful. For instance, the AsyncRecognizeMetadata message contains a percent complete field. This method would be more usable if it returns the value of the _last_operation metadata.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you. I copied this semantic from @garrettjonesgoogle's design for Java, and asked him about it in one of my review comments here.


return self._last_operation

def _poll(self, timeout=None):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Ruby and Node I opted to use exponential backoff. All gax libraries have a BackoffSettings class used for retryable api calls, I used this class to specify the backoff settings used for operation polling. Here is an example of it being used for retryable api calls.

call_options (google.gax.CallOptions, optional): the call options
that are used when reloading the operation.
"""
self._first_operation = operation

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think _first_operation is a needed property. The _last_operation property can be used where this is used per my comment on the metadata method and since the operation name will not change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we anticipate a situation where we might, in the future, have data that mutates between the first operation and the most recent one, where we might want to preserve the first for reference?

On the other hand, even if we do, we can always re-add _first_operation then. It is one line of code. :-)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a case arises, the user can just get the needed information before polling.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed _first_operation

except Exception as ex: # pylint: disable=broad-except
_LOG.exception(ex)

threading.Thread(target=_execute_clbk, args=(self, done_clbk)).start()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will cause many more api calls than are necessary since each thread will do it's own polling. A solution would be to have the logic for this method be as follows:

  1. If the operation is done, execute the callback and return
  2. Add the callback to a list of callbacks
  3. If not started, start polling thread that once polling is completed call all callbacks in the list.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point!

Regarding #1, we might still want to execute the callback in a separate thread. The expected performance characteristics of this method is that is just registers the callback and returns right away. The callback might have a non-trivial running time.

The design for having just one polling loop makes a lot of sense.

Copy link
Contributor

@bjwatson bjwatson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good! I have several comments, but this looks close to done.

Please note that one of my comments refers to rebuilding the LRO GAPIC code and including it in this PR.

"""Issues OperationsApi.get_operation and returns value of Operation.done."""
return self._get_operation().done

def add_done_callback(self, done_clbk):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 to fn.

I also agree with switching to multiprocess, since Luke says its preferred among the Python community. However, let's not expose our implementation to the caller. We won't necessarily know what the thread or subprocess is at the time of calling add_done_callback(); that could be dynamically determined at the time of executing the callback. The primary thread would need to use a monitor object or something like that if it needs to know when the callback has been triggered.

"""
return _from_any(self._metadata_type, self._first_operation.metadata)

def last_operation_data(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the docstring is wrong. That's my fault, since I added this line in the design doc.

IIUC, we are waiting until the first call to get the longrunning.Operation object is completed before we return an OperationFuture wrapper, so the condition regarding the first call must be satisfied already.

@garrettjonesgoogle I copied this comment from your Java design. Are we missing anything here regarding "Blocks if the first call isn't done yet"?

"""
return _from_any(self._metadata_type, self._first_operation.metadata)

def last_operation_data(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lukesneeringer Yes, we do want these extra methods that are not part of c.f.Future. They provide access for power users to the actual meta-API mechanism that sits behind this wrapper.


return self._last_operation

def _poll(self, timeout=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have an exponential backoff algorithm for retrying API calls with transient errors. We should reuse that.

return pb_type.FromString(any_pb.value)


class ResultError(GaxError):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem to add much to justify another error type. We should just wrap the operation_error in a GaxError (as the cause).

Copy link
Contributor

@lukesneeringer lukesneeringer Dec 28, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disagree. :-) I think there could be a case where a developer wants to catch result errors but does not want to catch other ways that a GaxError might be thrown, now or in the future. The subclass makes this really easy, and is much cleaner than catching GaxError, checking cause, and then re-raising if you do not recognize it.

(And nothing stops someone from catching GaxError and getting these along with it.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @bjwatson here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Going with @bjwatson for now. Currently a subclass is YAGNI.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. The only GaxError subclass we've defined so far is RetryError, and that's only because it pertains to a specific client-side feature that we implement (automatic retry for transient errors on idempotent methods). Errors associated with LRO will be more along the lines of server-side errors we get from synchronous methods, which we just wrap in plain GaxError.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still disagree, but am entirely happy to be outvoted. :-)

"""If last Operation's value of `done` is true, returns false;
otherwise, issues OperationsApi.cancel_operation and returns true.
"""
if not self._last_operation.done:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be self._get_operation() to know for sure whether it's done.


def cancelled(self):
"""Return True if the call was successfully cancelled."""
return self._cancelled
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessarily. We need to check the _client to know for sure. See the cancel_operation() documentation.

Therefore, we should only set self._cancelled when the operations API tells us that it's been cancelled, and the only purpose of it is to cache that it's been cancelled once we know for sure that it has been.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do agree here.

self._client.cancel_operation(self._last_operation.name)
self._cancelled = True

return self._cancelled
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment below for cancelled(). The return value from this method just indicates whether we attempted to do the cancellation. However, the cancellation itself is asynchronous, so the cancelled() call needs to check if it was actually cancelled.

Copy link
Contributor

@lukesneeringer lukesneeringer Dec 28, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, are you sure? The asyncio.Future.cancel() documentation (somewhat ambiguously) suggests the implementation @eoogbe has now. And an upside to her implementation is that the cancel request is guaranteed to be non-blocking.

There is also a potential race condition: what if you send cancel, then ask if it was cancelled, but the cancellation is still pending. Under what you suggest, we would return False there, but really True is the proper response.

OTOH: An alternative reading is that "change the Future's state to cancelled" assumes actual cancellation success, and that we should return False if unable.

I think the only way to do this if we are trying to report on actual cancellation is if self._client.cancel_operation actually returns True or False based on its ability to cancel. If it does, we should set self._cancelled to that value and then return it. If it does not, then I think we should probably just return True.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was following c.f.Future.cancel(), which says to return True if cancelling, not just when cancelled. (In fact, when it's already cancelled, I return False.) The cancelled method has the semantics you're referring to, so I should change that one.

Copy link
Contributor

@lukesneeringer lukesneeringer Dec 29, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The c.f.Future.cancel() docs actually seem like a stronger case for @bjwatson's position to me.

I think where I am at at this point is that if you can get "cancelable" status with no race condition, return that. If you have to poll to see if it was actually cancelled, I think that would be a mistake and returning True would be safer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lukesneeringer I think we're all in agreement. I'm completely okay with the implementation that @eoogbe has for this method (cancel()); it matches the c.f.Future doc and it's non-blocking.

I'm just not sure it's worth cacheing self._cancelled here, because the cancelled() method needs to check the Operations API to see if the cancellation actually happened. If the cancelled() method finds out that it did, then it can cache a self._cancelled value.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ooooh, I see. You are not saying that the method should not return True naively; you are saying it should not save self._canceled. I agree.

start_time = time.time()

while timeout is None or time.time() < start_time + timeout:
if self._get_operation().done:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be simplified to if done().

Copy link
Contributor

@lukesneeringer lukesneeringer Dec 28, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if self.done():

:-)

@@ -54,6 +54,7 @@
'ply==3.8',
'protobuf>=3.0.0, <4.0dev',
'oauth2client>=2.0.0, <4.0dev',
'googleapis-common-protos>=1.5.0',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add an upper-bound: googleapis-common-protos>=1.5.0, <2.0dev

@bjwatson
Copy link
Contributor

@dhermes @daspecster @tseaver I'd like to invite your feedback as well. This is the interface we're developing to improve the interface for LRO. For example, Speech google.cloud.gapic.speech.v1beta1.speech_client.SpeechClient.async_recognize() will return this new OperationFuture object, rather than the underlying gRPC google.longrunning.Operation object.

import time
import threading
from concurrent import futures
from .errors import GaxError
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why relative imports ever?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should change this to from google.gax.errors import GaxError.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. @eoogbe our Python style is to always use absolute imports. See https://google.github.io/styleguide/pyguide.html#Imports. I'm sure there's a PEP that says something similar.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, absolute imports are almost always preferred. (There are some rare corner cases.)

'Could not convert {} to {}'.format(
any_pb.__class__.__name__, pb_type.__name__))

return pb_type.FromString(any_pb.value)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL! Thanks @landrito

return pb_type.FromString(any_pb.value)


class ResultError(GaxError):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @bjwatson here


def __init__(self, operation, client, result_type, metadata_type,
call_options=None):
"""Constructor.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am with @lukesneeringer here. An alternative would be to have a non-public factory constructor that manually mucks with instance attributes that aren't public. For example, you'd "never" expect a user to pass metadata_type to the constructor (though us upstream library dev "users" would be fine doing that)

target=_execute_clbks, args=(self,))
self._process.start()
elif not self._process.is_alive() and self._last_operation.done:
_execute_clbks(self)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Be careful here. This can cause some callbacks to be called twice since execute_clbks always calls every callback in the list.

For example:
Register cb A.
Register cb B.
Polling Completes.
Run cb A and cb B.
Some time.
Register cb C.
Run cb A, cb B, cb C

def cancelled(self):
"""Return True if the call was successfully cancelled."""
self._get_operation()
return self._last_operation.HasField('error') and \

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: using a line continuation is a little weird, instead of something like the following:

return (self._last_operation.HasField('error') and
    self._last_operation.error.code == code_pb2.CANCELLED)

@evaogbe
Copy link
Contributor Author

evaogbe commented Jan 3, 2017

PTAL

(Sorry about spamming with the individual comments. I didn't realize this convention.)


return self._last_operation

def _poll(self, timeout=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would suggest reusing (or refactoring to make reusable) the code in api_callable.py that @landrito linked to. I suspect you should be able to use the _retryable function as-is, specifying the numerical parameters as a RetryOptions object.


def __init__(self, operation, client, result_type, metadata_type,
call_options=None):
"""Constructor.
Copy link
Contributor

@geigerj geigerj Jan 3, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lukesneeringer It's not strange that the GAPIC docstrings will reference an underscore-prefixed class in GAX (i.e., the generated client will have a method that returns: A :class:`google.gax._OperationsFuture` instance)?

call_options (google.gax.CallOptions, optional): the call options
that are used when reloading the operation.
"""
self._last_operation = operation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why _last_operation instead of just _operation? The "last" implies to me that it might hold over time a sequence of operations, rather than a single one whose status is being updated.

client (google.gapic.longrunning.operations_client.OperationsClient):
a client for the long-running operation service.
result_type (type): the class type of the result.
metadata_type (type): the class type of the metadata.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's optional for an Operation to have metadata (see: https://github.com/googleapis/googleapis/blob/master/google/longrunning/operations.proto#L90). Should this be correspondingly optional, or else have documented behavior when calling metadata() method if metadata_type is None?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not optional in ruby, but it makes sense to do so. I'll add a check if the metadata is None and keep the TypeError if the metadata_type is not given when there's metadata.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For ruby and node, the user will specify the response or metadata type as google.protobuf.Empty as shown in the cloud functions gapic yaml.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood. Not necessary to change the logic here if metadata will just be an Empty instance in that case.

self._process = multiprocessing.Process(
target=_execute_clbks, args=(self,))
self._process.start()
mp.Process(target=self._execute_clbks).start()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure that self._process is getting assigned.

self._process = mp.Process(target=self._execute_clbks).start()

self._process = multiprocessing.Process(
target=_execute_clbks, args=(self,))
self._process.start()
mp.Process(target=self._execute_clbks).start()
elif not self._process.is_alive() and self._last_operation.done:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect that there is a race condition here. If the _execute_clbks process gets blocked after the loop in line 654 finishes, the fn could be added to the queue before before the _execute_clbks process is killed but after the callback pool is made, causing the fn to be lost.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed add_done_callback to use a list, so that it more closely matches the c.f.Future spec. I think that gets rid of the race condition as well.

Reducing concurrency simplifies the implementation and makes it closer
to the expected behavior of c.f.Future.
@evaogbe
Copy link
Contributor Author

evaogbe commented Jan 4, 2017

PTAL

Also, not sure why Travis fails on 3.5. I think it may be a bug. Can I safely ignore?

@lukesneeringer
Copy link
Contributor

Also, not sure why Travis fails on 3.5. I think it may be a bug. Can I safely ignore?

No, but I will help you fix it. :-)

Trying to fix Travis CI error:
"AttributeError: '_NamespacePath' object has no attribute 'sort'"
Still trying to fix the Travis CI error.
Still fixing Travis CI error.
This should actually fix the Travis CI error.
@evaogbe
Copy link
Contributor Author

evaogbe commented Jan 5, 2017

The Travis CI error appears to be an issue with pip 9.0.1. I think it's happening because google is a namespace package and google-gax is not. Not sure if this is fixable without downgrading/waiting on an upgrade from pip.

from google.gax import (BackoffSettings, BundleOptions, bundling, _CallSettings,
config, errors, PageIterator, ResourceIterator,
RetryOptions)
from google.gax.retry import add_timeout_arg, retryable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style: from google.gax import retry

(Note: Google style is only to import modules and packages; in this project, we additionally allow classes.)



def _has_timeout_settings(backoff_settings):
return backoff_settings.rpc_timeout_multiplier is not None and \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style: use parens for line continuation rather than backslashes


raise exc

return _retryable_without_timeout(inner, retry_options, **kwargs)
Copy link
Contributor

@geigerj geigerj Jan 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks to me like it retries inner, and inner itself retries with no delay; you effective have nested retry while loops. So if I understand right, it'll do something like:

call1: timeout 10, delay 1
call2: timeout 20, delay 1
call3: timeout 40, delay 1
call4: timeout 10, delay 1.5
call5: timeout 20, delay 1.5
call6: timeout 40, delay 1.5
call7: timeout 10, delay 2.25
...

instead of

call1: timeout 10, delay 1
call2: timeout 20, delay 1.5
call3: timeout 40, delay 2.25
...

I think this might be more easily done with a single retryable function essentially identical the one previously in api_callable with some conditionals only to update the timeout if applicable. Sort of like (not real Python):

def retryable(...):
  def inner(...):
    ...
    timeout = {timeout setting} if {timeout setting} else None
    now = time.time()
    deadline = now + {total_timeout} if {total_timeout} else None
    while deadline is None or now < deadline
      to_call = a_func
      try:
        if timeout:
         to_call = add_timeout_arg(to_call, ...)
        to_call()
      except:
        [update delay]
        if timeout:
          [update timeout]
     ...

Will something like that work here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I was having trouble finding a clean way to achieve this, but it looks like I have to check conditionals everywhere to get it to work. Fixed now.

Prevents double while loop when retrying with timeout.
@@ -0,0 +1,279 @@
# Copyright 2016 Google Inc. All rights reserved.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to switch this to BSD to conform to the rest of GAX. This is supposed to be configured in the GAPIC config here, but I expect it's not working because Python isn't on MVVM and the license is hardcoded into the template files.

In a separate toolkit PR, could you update the Python templates to select the correct license based on that GAPIC config field?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you update this manually until we can regenerate the package in api-client-staging? (Or, preferably, update the package in api-client-staging and copy it back into here.)

@evaogbe
Copy link
Contributor Author

evaogbe commented Jan 6, 2017

@lukesneeringer @landrito @geigerj LGTY?

@landrito
Copy link

landrito commented Jan 6, 2017

Looking right now!

from .errors import RetryError
from google.gax import (BackoffSettings, BundleOptions, bundling, _CallSettings,
config, errors, PageIterator, ResourceIterator,
RetryOptions, retry as rt)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think retry is more readable than rt.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

retry is already a variable name

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, OK. This is fine, then.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the same system that you use to solve a name collision with the stdlib: retry_.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. I'd add also that I'd prefer that, if there's a naming collision between a module and a local variable, the local variable rather than the module be renamed.


my_callable = retry.retryable(mock_call, retry_options)

self.assertRaises(errors.RetryError, my_callable)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a different condition than what the old test checked. Ideally, we should check that both

  • a RetryError is raised
  • the error has the right cause


@mock.patch('google.gax.config.exc_to_code')
@mock.patch('time.time')
def test_retryable_with_timeout(self, mock_time, mock_exc_to_code):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like some of the old tests didn't make it into this file?

  • function raises CustomException -> CustomException is set as the RetryError cause
  • function has a timeout and throws to_attempt retryable codes before succeeding -> call count should be to_attempt + 1
  • function does not retry when the status code list is nonempty in the Retry configuration, but the status code returned by the function isn't in the list

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

old retry tests added back in, PTAL

@@ -0,0 +1,279 @@
# Copyright 2016 Google Inc. All rights reserved.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you update this manually until we can regenerate the package in api-client-staging? (Or, preferably, update the package in api-client-staging and copy it back into here.)

self._process = None

def cancel(self):
"""If last Operation's value of `done` is true, returns false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: for consistency with elsewhere in GAX, don't indent the docstrings indent relative to the """.

@@ -4,7 +4,7 @@ pytest-cov>=1.8.1
pytest-timeout>=1.0.0
unittest2>=1.1.0
grpcio-tools>=1.0.0
google-auth>=0.2.0
google-auth>=0.5.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this change connected to this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was part of trying to fix the Travis CI error. Should I change it back?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, let's revert the changes made when working on the Travis error unless they're related to the rest of this PR.

self._process = mp.Process(target=self._execute_clbks)
self._process.start()
elif not self._process.is_alive() and self._operation.done:
_try_callback(self, fn)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would be better written as such to avoid storing callbacks when it is unnecessary.

if self._process.is_alive() and self._operation.done:
    _try_callback(self, fn)
    return

self._done_clbks.append(fn)
if self._process is None:
    self._process = mp.Process(target=self._execute_clbks)
    self._process.start()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it ends up looking messier because I have to check if the process is None twice:

if self._process is not None and not self._process.is_alive() and self._operation.done:
    _try_callback(self, fn)
    return

self._done_clbks.append(fn)
if self._process is None:
    self._process = mp.Process(target=self._execute_clbks)
    self._process.start()

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah good point! Maybe this?

if self._process is None:
    self._done_clbks.append(fn)
    self._process = mp.Process(target=self._execute_clbks)
    self._process.start()
elif not self._process.is_alive() and self._operation.done:
    _try_callback(self, fn)
else
    self._done_clbks.append(fn)

_try_callback(self, fn)

def operation_name(self):
"""Returns the value of Operation.name from the last call to
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Specifying which call the operation name comes from makes it seem like the operation name can change between calls. You should remove this specification to avoid confusion.

"""
return self._operation

def _get_operation(self):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be a useful method to expose publicly to allow users to implement their own polling.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Polling can already be implemented in terms of the public methods done and last_operation_data.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see. To me, done -> last_operation_data, seems a bit of a round about way for a user to simply ask the server for the for the current operation. If you see no problem with that, then just leave as is. :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the user is polling, they will check if the operation is done first anyway. Is there a usecase where the user would implement polling without checking if done?

Though perhaps, it'd be better to implement last_operation_data as _get_operation. I don't see why the user would want the stale data for the supposed last operation. @bjwatson Does this make sense to change?


return self._operation

def _done_check(self, _timeout):
Copy link

@landrito landrito Jan 6, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of things I'm unsure about:

I think this would be better as a nested function inside _poll, but I'm curious whether it is more pythonic to use a nested function or to have it be a private instance method as you have done.

I would use _, instead of _timeout for the parameter name since the parameter is unused, but I am also unsure if that convention is pythonic.

self._poll()

for done_clbk in self._done_clbks:
_try_callback(self, done_clbk)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why the callbacks are still being stored after the they are run?

@@ -673,3 +670,5 @@ def _execute_clbks(self):

for done_clbk in self._done_clbks:
_try_callback(self, done_clbk)

self._done_clbks = []
Copy link

@landrito landrito Jan 6, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's bad practice (and also causes bad things to happen) to update a list that is being iterated on in a for loop. This would be better if it was a queue that was just getting shifted for each callback which would both empty the list after all callbacks were called and would avoid the updating a list while in a for loop.

In _OperationFuture constructor:

_done_clbks = deque()

In this function:

while _done_clbks:
    done_clbk = _done_clbks.popleft()
    _try_callback(self, done_clbk)

edit: accidentally deleted this comment. this is it reposted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now using a deque, PTAL

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's bad practice (and also causes bad things to happen) to update a list that is being iterated on in a for loop.

It actually raises an exception in Python.

@landrito landrito dismissed their stale review January 9, 2017 21:03

I don't see any needed changes. I will just dismiss my "requested changes" review as opposed to providing an LGTM since I am not one of the main Python reviewers for the team.

@lukesneeringer
Copy link
Contributor

I am shifting to approved. This is good enough and I do not think that you will have an easy time fixing the 3.5 Travis issue.

Copy link
Contributor

@geigerj geigerj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good overall! Just one optional nit and one tiny test comment.

from .errors import RetryError
from google.gax import (BackoffSettings, BundleOptions, bundling, _CallSettings,
config, errors, PageIterator, ResourceIterator,
RetryOptions, retry as rt)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. I'd add also that I'd prefer that, if there's a naming collision between a module and a local variable, the local variable rather than the module be renamed.


my_callable = retry.retryable(mock_call, retry_options)

try:
Copy link
Contributor

@geigerj geigerj Jan 10, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

even better:

try:
  my_callable()
  self.fail('Should not have been reached')
except ...
...

This gets the best of both the old and your new tests: it succeeds only if there's an exception and it's the right one.

(here and above)

@evaogbe evaogbe merged commit f773dcf into googleapis:master Jan 10, 2017
@evaogbe evaogbe deleted the op-future branch January 10, 2017 20:20
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants